package com.youxin.infra.io;

import com.youxin.infra.entity.ItemSink;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<ItemSink> {

    private String topic;

    public ProducerStringSerializationSchema(String topic) {
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(ItemSink element, @Nullable Long timestamp) {
        return new ProducerRecord<byte[], byte[]>(this.topic, element.toString().getBytes(StandardCharsets.UTF_8));
    }
}
